package simpledb.execution;

import simpledb.common.Utility;
import simpledb.transaction.TransactionAbortedException;
import simpledb.common.DbException;
import simpledb.storage.Tuple;
import simpledb.storage.TupleDesc;

import java.util.*;

/**
 * The Join operator implements the relational join operation.
 */
public class Join extends Operator {

    private static final long serialVersionUID = 1L;
    private final JoinPredicate predicate;
    private final OpIterator child1;
    private final OpIterator child2;
    private final TupleDesc tupleDesc;
    private Tuple child1Cur = null;

    /**
     * Constructor. Accepts two children to join and the predicate to join them
     * on
     *
     * @param p      The predicate to use to join the children
     * @param child1 Iterator for the left(outer) relation to join
     * @param child2 Iterator for the right(inner) relation to join
     */
    public Join(JoinPredicate p, OpIterator child1, OpIterator child2) {
        this.predicate = p;
        this.child1 = child1;
        this.child2 = child2;
        this.tupleDesc = TupleDesc.merge(child1.getTupleDesc(), child2.getTupleDesc());
    }

    public JoinPredicate getJoinPredicate() {
        // some code goes here
        return predicate;
    }

    /**
     * @return the field name of join field1. Should be quantified by
     * alias or table name.
     */
    public String getJoinField1Name() {
        // some code goes here
        return null;
    }

    /**
     * @return the field name of join field2. Should be quantified by
     * alias or table name.
     */
    public String getJoinField2Name() {
        // some code goes here
        return null;
    }

    /**
     * @see TupleDesc#merge(TupleDesc, TupleDesc) for possible
     * implementation logic.
     */
    public TupleDesc getTupleDesc() {
        // some code goes here
        return tupleDesc;
    }

    public void open() throws DbException, NoSuchElementException,
            TransactionAbortedException {
        super.open();
        child1.open();
        child2.open();
        // some code goes here
    }

    public void close() {
        super.close();
        child1.close();
        child2.close();
        // some code goes here
    }

    public void rewind() throws DbException, TransactionAbortedException {
        child1.rewind();
        child2.rewind();
        // some code goes here
    }

    /**
     * Returns the next tuple generated by the join, or null if there are no
     * more tuples. Logically, this is the next tuple in r1 cross r2 that
     * satisfies the join predicate. There are many possible implementations;
     * the simplest is a nested loops join.
     * <p>
     * Note that the tuples returned from this particular implementation of Join
     * are simply the concatenation of joining tuples from the left and right
     * relation. Therefore, if an equality predicate is used there will be two
     * copies of the join attribute in the results. (Removing such duplicate
     * columns can be done with an additional projection operator if needed.)
     * <p>
     * For example, if one tuple is {1,2,3} and the other tuple is {1,5,6},
     * joined on equality of the first column, then this returns {1,2,3,1,5,6}.
     *
     * @return The next matching tuple.
     * @see JoinPredicate#filter
     */
    protected Tuple fetchNext() throws TransactionAbortedException, DbException {
        // some code goes here
        while (child1Cur != null || child1.hasNext()) {
            if (child1Cur == null) {
                child1Cur = child1.next();
            }
            while (child2.hasNext()) {
                Tuple t2 = child2.next();
                if (!predicate.filter(child1Cur, t2)) {
                    continue;
                }
                return mergeTuple(child1Cur, t2, tupleDesc);
            }
            child2.rewind();
            child1Cur = null;
        }
        return null;
    }

    private static Tuple mergeTuple(Tuple t1, Tuple t2, TupleDesc retDesc) {
        Tuple ret = new Tuple(retDesc);
        for (int i = 0; i < t1.getTupleDesc().numFields(); i++) {
            ret.setField(i, t1.getField(i));
        }
        for (int i = 0; i < t2.getTupleDesc().numFields(); i++) {
            ret.setField(i + t1.getTupleDesc().numFields(), t2.getField(i));
        }
        return ret;
    }

    @Override
    public OpIterator[] getChildren() {
        // some code goes here
        return null;
    }

    @Override
    public void setChildren(OpIterator[] children) {
        // some code goes here
    }

}
